package com.jie.flink.cdc.flinksource.config;

import com.google.common.collect.Lists;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 数据源配置清洗类
 * @author zhanggj
 * @date 2023/6/6 22:19
 * @desc
 */
public class SourceConfigWashMapper {
    /**
     * 真实表名和逻辑表名对应关系
     */
    public static final Map<String, String> ACTUAL_LOGICAL_TABLE_NAME = new HashMap<>();

    private final static String LEFT_BRACKET = "[";
    private final static String RIGHT_BRACKET = "]";
    private final static String TABLE_NAME_HYPHEN = "_";
    private final static String TABLE_NAME_RANGE_HYPHEN = "~";
    private final static String TABLE_NAME_LEVEL_HYPHEN = ".";

    /**
     * 监听表名清洗服务
     * 针对分区（分表）表监听配置数据进行清洗，分区（表）表名配置如下
     * patrol_point[_0~1023]、patrol_equip[2016~2038]
     * @param tableArray
     * @return
     */
    public static String[] tableNameWash(String[] tableArray) {
        if (ArrayUtils.isEmpty(tableArray)) {
            return null;
        }
        final List<String> actualTableNameList = new ArrayList<>(tableArray.length * 2);
        for (int i = 0; i < tableArray.length; i++) {
            final String tableName = tableArray[i];
            // 判断是否包含表达式
            if (!StringUtils.contains(tableName, LEFT_BRACKET)
                    || !StringUtils.contains(tableName, RIGHT_BRACKET)) {
                actualTableNameList.add(tableName);
                continue;
            }
            actualTableNameList.addAll(analysisTableName(tableName));
        }
        return actualTableNameList.toArray(new String[actualTableNameList.size()]);
    }

    private static List<String> analysisTableName(final String tableName) {
        // 获取表达式
        final String expression = StringUtils.substringBetween(tableName, LEFT_BRACKET, RIGHT_BRACKET);
        if (StringUtils.isBlank(expression)) {
            return Lists.newArrayList(tableName);
        }
        final String logicalTableName = StringUtils.substringBefore(tableName, LEFT_BRACKET);
        final String actualTableNameStart;
        final String[] rangeNumArray = expression.split(TABLE_NAME_RANGE_HYPHEN);
        int numStart = 0;
        int numEnd = Integer.parseInt(rangeNumArray[1]) + 1;
        if (expression.startsWith(TABLE_NAME_HYPHEN)) {
            numStart = Integer.parseInt(StringUtils.substring(rangeNumArray[0], TABLE_NAME_HYPHEN.length()));
            actualTableNameStart = logicalTableName.concat(TABLE_NAME_HYPHEN);
        } else {
            numStart = Integer.parseInt(rangeNumArray[0]);
            actualTableNameStart = logicalTableName;
        }
        final List<String> actualTableNameList = new ArrayList<>(numEnd - numStart);
        for (int i = numStart; i < numEnd; i++) {
            final String actualTableName = actualTableNameStart.concat(String.valueOf(i));
            actualTableNameList.add(actualTableName);
            ACTUAL_LOGICAL_TABLE_NAME.put(removeSchema(actualTableName), removeSchema(logicalTableName));
        }

        return actualTableNameList;
    }

    private static String removeSchema(final String tableName) {
        if (!StringUtils.contains(tableName, TABLE_NAME_LEVEL_HYPHEN)) {
            return tableName;
        }
        return StringUtils.substringAfter(tableName, TABLE_NAME_LEVEL_HYPHEN);
    }
}
